/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.hylanda.processors.entityname;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hylanda.entity.keyword.HLKeywordProcess;
import com.hylanda.entity.loc.HLLocationProcess;
import com.hylanda.entity.name.HLNameProcess;
import com.hylanda.tools.Pair;
import org.apache.commons.lang.StringUtils;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ShowFieldStyle;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;

import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.*;

@Tags({ "实体名识别" })
@CapabilityDescription("实体名识别")
@SeeAlso()
@ReadsAttributes({ @ReadsAttribute(attribute = "") })
@WritesAttributes({ @WritesAttribute(attribute = "") })
public class HLEntityNameProcessor extends AbstractProcessor {

	private static final String DEFAULT_SEG_PATH = "/usr/local/nifi/seglib/";
	private static final String DEFAULT_CONFIG_PATH = "/usr/local/nifi/common/";
	private static final String DEFAULT_LOC_PATH = "/usr/local/nifi/locdict/";

	private HLLocationProcess hllocation = new HLLocationProcess();
	private HLNameProcess hlname = new HLNameProcess();
	private HLKeywordProcess hlkeyword = new HLKeywordProcess();

	@Override
	protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
		PropertyDescriptor.Builder propertyBuilder = new PropertyDescriptor.Builder().name(propertyDescriptorName)
				.required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true);
		return propertyBuilder.build();
	}

	private static final PropertyDescriptor FIELD_PROPERTY_NAME = new PropertyDescriptor.Builder()
			.group("________以下是【人名识别】配置组________")
			.name("deal_fields_name").displayName("处理人名字段").description("设置要识别人名的字段").required(false)
			.addValidator(StandardValidators.StringLengthValidator.VALID)
			.showField(ShowFieldStyle.CHECK.toString())
			.build();

	private static final PropertyDescriptor BLACK_WORDS_PROPERTY = new PropertyDescriptor.Builder().name("black_words")
			.displayName("黑名单").description("需要过滤的人名识别结果").required(false)
			.addValidator(StandardValidators.StringLengthValidator.VALID).build();

	private static final PropertyDescriptor WHITE_WORDS_PROPERTY = new PropertyDescriptor.Builder().name("white_words")
			.displayName("白名单").description("指定必须识别人名").required(false)
			.addValidator(StandardValidators.StringLengthValidator.VALID).build();
	
	private static final PropertyDescriptor FIELD_PROPERTY_ORG = new PropertyDescriptor.Builder()
			.group("________以下是【机构名识别】配置组________")
			.name("deal_fields_org")
			.displayName("处理机构名字段").description("设置要识别机构名的字段").required(false)
			.addValidator(StandardValidators.StringLengthValidator.VALID)
			.showField(ShowFieldStyle.CHECK.toString())
			.build();
	
	
	private static final PropertyDescriptor FIELD_PROPERTY_LOC = new PropertyDescriptor.Builder()
			.group("________以下是【地名识别】配置组________")
			.name("deal_fields_loc")
			.displayName("处理地名字段").description("设置要识别地名的字段").required(false)
			.addValidator(StandardValidators.StringLengthValidator.VALID)
			.showField(ShowFieldStyle.CHECK.toString())
			.build();
	
	private static final PropertyDescriptor SELECT_LOC_TYPE_PROPERTY = new PropertyDescriptor.Builder().name("loc_type")
			.displayName("选择识别地名类型").description("选择识别地名类型").required(false).multiValues(true)
			.allowableValues("全国行政区划地名", "外国地名", "景点坐标村庄道路类地名").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
			.build();
	
	private static final PropertyDescriptor FIELD_PROPERTY_KEYWORD = new PropertyDescriptor.Builder()
			.group("________以下是【关键词识别】配置组________")
			.name("deal_fields_keyword")
			.displayName("处理关键词字段").description("设置要识关键词的字段").required(false)
			.addValidator(StandardValidators.StringLengthValidator.VALID)
			.showField(ShowFieldStyle.CHECK.toString())
			.build();
	
	private static final PropertyDescriptor KEYWORD_CNT_PROPERTY = new PropertyDescriptor.Builder().name("keyword_cnt")
			.displayName("识别关键词个数").description("设置要识关键词的个数").required(false)
			.addValidator(StandardValidators.StringLengthValidator.VALID)
			.build();
	
	private static final PropertyDescriptor KEYWORD_DICT_PROPERTY = new PropertyDescriptor.Builder().name("keyword_dict")
			.displayName("关键词自定义词典").description("关键词自定义词典").required(false)
			.addValidator(StandardValidators.StringLengthValidator.VALID)
			.build();
	

	private static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder().name("识别")
			.description("抽取出文章中的实体名称").build();

	private static final Relationship FAIL_RELATIONSHIP = new Relationship.Builder().name("未识别")
			.description("未抽取出文章中的实体名称").autoTerminateDefault(true).build();

	private List<PropertyDescriptor> descriptors;

	private Set<Relationship> relationships;

	private Set<String> dealFieldNameSet = new HashSet<>();
	private Set<String> dealFieldOrgSet = new HashSet<>();
	private Set<String> dealFieldLocSet = new HashSet<>();
	private Set<String> dealFieldKeywordSet = new HashSet<>();

	private Properties properties;
	private ProcessContext context;
	private boolean isInitOk;
	private static Map<PropertyDescriptor,String> MAP_OUT_FIELD_PROPERTY = new HashMap<>();

	@Override
	protected void init(final ProcessorInitializationContext context) {
		final List<PropertyDescriptor> descriptors = new ArrayList<>();
		
		descriptors.add(FIELD_PROPERTY_NAME);
		descriptors.add(BLACK_WORDS_PROPERTY);
		descriptors.add(WHITE_WORDS_PROPERTY);
		
		descriptors.add(FIELD_PROPERTY_ORG);

		descriptors.add(FIELD_PROPERTY_LOC);
		descriptors.add(SELECT_LOC_TYPE_PROPERTY);
		
		descriptors.add(FIELD_PROPERTY_KEYWORD);
		descriptors.add(KEYWORD_DICT_PROPERTY);
		descriptors.add(KEYWORD_CNT_PROPERTY);

		this.descriptors = Collections.unmodifiableList(descriptors);

		final Set<Relationship> relationships = new HashSet<>();
		relationships.add(SUCCESS_RELATIONSHIP);
		relationships.add(FAIL_RELATIONSHIP);
		this.relationships = Collections.unmodifiableSet(relationships);

	}

	@Override
	public Set<Relationship> getRelationships() {
		return this.relationships;
	}

	@Override
	public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
		return descriptors;
	}

	private void updateFieldSet(String propertyKey, Set<String> fieldSet){
		PropertyValue propertyName = context.getProperty(propertyKey);
		String dealFieldsName = propertyName.getValue();
		fieldSet.addAll(getDealFields(dealFieldsName));
	}

	@Override
	public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
		if (isFieldDescriptor(descriptor)) {
			MAP_OUT_FIELD_PROPERTY.put(descriptor, newValue);
			Map<String,String> mapAppendField = new HashMap<>();
			for(Map.Entry<PropertyDescriptor,String> ety : MAP_OUT_FIELD_PROPERTY.entrySet()) {
				List<Pair<String, String>> fieldInfoList = getNewFieldName(ety.getKey(), ety.getValue());
				for(Pair<String, String> fieldInfo: fieldInfoList){
					mapAppendField.put(fieldInfo.getKey(), fieldInfo.getValue());
				}
			}
			setAppendField(mapAppendField);
		}
	}

	@SuppressWarnings("unused")
	@OnScheduled
	public void onScheduled(final ProcessContext context) {
		isInitOk = false;
		readProperties();
		this.context = context;
		
		//先判断需要做什么，是不是需要做分词
		updateFieldSet("deal_fields_name", dealFieldNameSet);
		updateFieldSet("deal_fields_org", dealFieldOrgSet);
		updateFieldSet("deal_fields_loc", dealFieldLocSet);
		updateFieldSet("deal_fields_keyword", dealFieldKeywordSet);

        //需要分词的字段
		String propertyKeyConfigPath = "processor.hlentityname.configPath";
		String savePath = getProperty(propertyKeyConfigPath, DEFAULT_CONFIG_PATH);
		String propertyKeySegmentDictPath = "newsegment.dictPath";
		String segpath = getProperty(propertyKeySegmentDictPath, DEFAULT_SEG_PATH);
		String propertyKeyLocPath = "processor.hlentityname.locPath";
		String locPath = getProperty(propertyKeyLocPath, DEFAULT_LOC_PATH);
		
		//只要需要做人名，机构名，关键词的识别，就需要加载分词的静态词典
		/*if(!dealFieldNameSet.isEmpty() ||
				! dealFieldOrgSet.isEmpty() ||
				! dealFieldKeywordSet.isEmpty()){
			if (!StringUtils.isEmpty(segpath)) {
				m_bLoadSeg = Segmentor.loadStaticCoreDict(segpath + "CoreDict.dat");
				if(!m_bLoadSeg){
					throw new RuntimeException("HylandaSegment init error!");
				}
			}else{
				throw new RuntimeException("HylandaSegment init error!");
			}
		}*/
		
		//人名白名单
		PropertyValue property = context.getProperty("white_words");
		String whiteWords = property.getValue();
		getLogger().info("whiteWords is " + whiteWords);
		
		//人名黑名单
		property = context.getProperty("black_words");
		String blackWords = property.getValue();
		
		//地名类型
		String locType = context.getProperty("loc_type").getValue();
		
		//关键词数量
		String keywordcnt = context.getProperty("keyword_cnt").getValue();
		int keycnt = Integer.MAX_VALUE;
		if(!StringUtils.isBlank(keywordcnt)){
			try {
				keycnt = Integer.valueOf(keywordcnt);
			} catch (Exception ignored) {
			}
		}
		
		StringBuffer erBuffer = new StringBuffer();

		boolean ret = hlname.init(dealFieldNameSet, dealFieldOrgSet, getIdentifier(),segpath,savePath,
				whiteWords, blackWords, erBuffer);
		if(!ret){
			throw new RuntimeException("HylandaName init error!" + erBuffer.toString());
		}

		ret = hllocation.init(locPath, locType, dealFieldLocSet);
		if(!ret){
			hlname.uninit();
			throw new RuntimeException("HylandaLoc init error!");
		}
		
		ret = hlkeyword.init(context.getProperty("keyword_dict").getValue(), dealFieldKeywordSet, keycnt, 
				segpath, savePath, getIdentifier(), erBuffer);
		if(!ret){
			hlname.uninit();
			throw new RuntimeException("HylandaKeyword init error!" + erBuffer.toString());
		}
		isInitOk = true;
	}

	@Override
	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

		FlowFile flowFile = session.get();
		if (flowFile == null || !isInitOk) {
			return;
		}

		try {

			Map<String, String> dataMap = sessionToMap(session, flowFile);
			if (dataMap == null || dataMap.isEmpty()) {
				return;
			}

			boolean flag = doRecognition(dataMap);
			flowFile = writeToSession(dataMap, session, flowFile);

			if (flag) {
				session.transfer(flowFile, SUCCESS_RELATIONSHIP);
			} else {
				session.transfer(flowFile, FAIL_RELATIONSHIP);
			}

			JSONObject msgJson = new JSONObject();
			msgJson.put("url", dataMap.get("url"));
			getLogger().billing(flowFile.getAttribute(CoreAttributes.UUID.key()), msgJson.toJSONString());

		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	@SuppressWarnings("unused")
	@OnAdded
	public void onAdded() {
	}

	@SuppressWarnings("unused")
	@OnShutdown
	public void onShutdown(final ProcessContext context) {
		if(isInitOk) {
			hlname.uninit();
			hlkeyword.uninit();
			isInitOk = false;
		}
	}

	@SuppressWarnings("unused")
	@OnStopped
	public void onStopped(final ProcessContext context){
		if(isInitOk) {
			hlname.uninit();
			hlkeyword.uninit();
			isInitOk = false;
		}
	}

	private boolean doRecognition(Map<String, String> dataMap) {
		boolean ret = hlname.doRecognition(dataMap);
		ret |= hllocation.doLocRecognition(dataMap);
		ret |= hlkeyword.doLocRecognition(dataMap);
		
		return ret;
	}


	private HashSet<String> getDealFields(String name) {
	
		HashSet<String> dealSet = new HashSet<>();
		
		if(!StringUtils.isEmpty(name)){
			name = name.replace("，", ",");

			String[] tmps = name.split(",");
			Collections.addAll(dealSet, tmps);
		}

		return dealSet;
	}

	private void readProperties() {
		properties = new Properties();
		String propertyKeyHlPropertyFile = "hl.properties.file";
		String hlPropertyFile = System.getProperty(propertyKeyHlPropertyFile);
		try (final FileInputStream fis = new FileInputStream(hlPropertyFile);
			 InputStreamReader reader = new InputStreamReader(fis, StandardCharsets.UTF_8)) {
			properties.load(reader);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private String getProperty(String key, String valueDefault) {
		return context.getProperty(key).getValue() != null ? context.getProperty(key).getValue()
				: properties.getProperty(key) != null ? properties.getProperty(key) : valueDefault;
	}

	private Map<String, String> sessionToMap(ProcessSession session, FlowFile flowFile) {
		if (flowFile == null) {
			return null;
		} else {
			final byte[] content = new byte[(int)flowFile.getSize()];
			session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true));
			JSONObject data = JSONObject.parseObject(new String(content, StandardCharsets.UTF_8));
			if (data == null) {
				return null;
			} else {
				Map<String, String> map = new HashMap<>();

				for (Map.Entry<String, Object> entry : data.entrySet()) {
					if (entry.getValue() != null) {
						map.put(entry.getKey(), entry.getValue().toString());
					}
				}

				return map;
			}
		}
	}

	private static FlowFile writeToSession(Map<String, ?> contentMap, ProcessSession session, FlowFile flowFile) {
		final JSONObject data = new JSONObject();
		Iterator iterator = contentMap.entrySet().iterator();

		while(true) {
			Map.Entry entry;
			Object value;
			do {
				if (!iterator.hasNext()) {
					return session.write(flowFile, out -> out.write(data.toJSONString().getBytes(StandardCharsets.UTF_8)));
				}

				entry = (Map.Entry)iterator.next();
				value = entry.getValue();
			} while(value == null);

			String strValue = value.toString();
			String trim = strValue.trim();
			if (!trim.startsWith("{") && !trim.startsWith("[")) {
				data.put((String)entry.getKey(), strValue);
			} else {
				try {
					Object parse = JSON.parse(strValue);
					data.put((String)entry.getKey(), parse);
				} catch (Exception var10) {
					data.put((String)entry.getKey(), strValue);
				}
			}
		}
	}

	private boolean isFieldDescriptor(PropertyDescriptor descriptor){
		return (descriptor.equals(FIELD_PROPERTY_NAME) || descriptor.equals(FIELD_PROPERTY_ORG)
				|| descriptor.equals(FIELD_PROPERTY_LOC) || descriptor.equals(FIELD_PROPERTY_KEYWORD));
	}

	private List<Pair<String, String>> getNewFieldName(PropertyDescriptor descriptor, String sourceField){
		List<Pair<String, String>> filedInfoList = new ArrayList<>();
		if(!StringUtils.isBlank(sourceField)) {
			if(descriptor.equals(FIELD_PROPERTY_NAME)){
				List<String> fieldList = hlname.genNameFieldList(sourceField);
				for(String field: fieldList) {
					filedInfoList.add(new Pair<>(field, "人名提取结果字段"));
				}
			}else if(descriptor.equals(FIELD_PROPERTY_ORG)){
				List<String> fieldList = hlname.genOrgFieldList(sourceField);
				for(String field: fieldList) {
					filedInfoList.add(new Pair<>(field, "机构名提取结果字段"));
				}
			}else if(descriptor.equals(FIELD_PROPERTY_LOC)){
				List<String> fieldList = hllocation.genLocationField(sourceField);
				for(String field: fieldList) {
					filedInfoList.add(new Pair<>(field, "地名提取结果字段"));
				}
			}else if(descriptor.equals(FIELD_PROPERTY_KEYWORD)){
				List<String> fieldList = hlkeyword.genKeywordField(sourceField);
				for(String field: fieldList) {
					filedInfoList.add(new Pair<>(field, "关键词提取结果字段"));
				}
			}
		}
		return filedInfoList;
	}
}
